Go 并发编程-6.Sync.Cond

Cond 通常应用于等待某个条件的一组 goroutine,等条件变为 true 的时候,其中一个 goroutine 或者所有的 goroutine 都会被唤醒执行。

Cond 是和某个条件相关,这个条件需要一组 goroutine 协作共同完成,在条件还没有满足的时候,所有等待这个条件的 goroutine 都会被阻塞住,只有这一组 goroutine 通过协作达到了这个条件,等待的 goroutine 才可能继续进行下去。

基本用法

标准库中的 Cond 并发原语初始化的时候,需要关联一个 Locker 接口的实例,一般我们使用 Mutex 或者 RWMutex。

1
2
3
4
5
type Cond
func NeWCond(l Locker) *Cond
func (c *Cond) Broadcast()
func (c *Cond) Signal()
func (c *Cond) Wait()

Cond 关联的 Locker 实例可以通过 c.L 访问

Signal 方法,允许调用者 Caller 唤醒一个等待此 Cond 的 goroutine。如果此时没有等待的 goroutine,显然无需通知 waiter;如果 Cond 等待队列中有一个或者多个等待的 goroutine,则需要从等待队列中移除第一个 goroutine 并把它唤醒。

Broadcast 方法,允许调用者 Caller 唤醒所有等待此 Cond 的 goroutine。如果此时没有等待的 goroutine,显然无需通知 waiter;如果 Cond 等待队列中有一个或者多个等待的 goroutine,则清空所有等待的 goroutine,并全部唤醒。

Wait 方法,会把调用者 Caller 放入 Cond 的等待队列中并阻塞,直到被 Signal 或者 Broadcast 的方法从等待队列中移除并唤醒。

使用示例

10 个运动员进入赛场之后需要先做拉伸活动活动筋骨,向观众和粉丝招手致敬,在自己的赛道上做好准备;等所有的运动员都准备好之后,裁判员才会打响发令枪。

每个运动员做好准备之后,将 ready 加一,表明自己做好准备了,同时调用 Broadcast 方法通知裁判员。因为裁判员只有一个,所以这里可以直接替换成 Signal 方法调用。调用 Broadcast 方法的时候,我们并没有请求 c.L 锁,只是在更改等待变量的时候才使用到了锁。

裁判员会等待运动员都准备好(c.Wait())。虽然每个运动员准备好之后都唤醒了裁判员,但是裁判员被唤醒之后需要检查等待条件是否满足(运动员都准备好了)。可以看到,裁判员被唤醒之后一定要检查等待条件,如果条件不满足还是要继续等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

func main() {
c := sync.NewCond(&sync.Mutex{})
var ready int

for i := 0; i < 10; i++ {
go func(i int) {
time.Sleep(time.Duration(rand.Int63n(10)) * time.Second)

// 加锁更改等待条件
c.L.Lock() // 加锁
ready++
c.L.Unlock() // 解锁

log.Printf("运动员#%d 已准备就绪\n", i)
// 广播唤醒所有的等待者
c.Broadcast()
}(i)
}

c.L.Lock() // 加锁
for ready != 10 {
c.Wait()
log.Println("裁判员被唤醒一次")
}
c.L.Unlock() // 解锁

//所有的运动员是否就绪
log.Println("所有运动员都准备就绪。比赛开始,3,2,1, ......")
}

实现原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

type Cond struct {
noCopy noCopy

// 当观察或者修改等待条件的时候需要加锁
L Locker

// 等待队列
notify notifyList
// 辅助结构,可以在运行时检查 Cond 是否被复制使用
checker copyChecker
}

func NewCond(l Locker) *Cond {
return &Cond{L: l}
}

func (c *Cond) Wait() {
c.checker.check()
// 增加到等待队列中
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock() // 注意这里是先解锁,再加锁,所以在使用 Wait 的时,需要先在外面加锁
// 阻塞休眠直到被唤醒
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}

func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}

func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}

Wait 把调用者加入到等待队列时会释放锁,在被唤醒之后还会请求锁。在阻塞休眠期间,调用者是不持有锁的,这样能让其他 goroutine 有机会检查或者更新等待变量。也就是上面的 runtime_notifyListWait(&c.notify, t) 部分。

常见错误

调用 Wait 的时候没有加锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

func main() {
c := sync.NewCond(&sync.Mutex{})
var ready int

for i := 0; i < 10; i++ {
go func(i int) {
time.Sleep(time.Duration(rand.Int63n(10)) * time.Second)

// 加锁更改等待条件
c.L.Lock()
ready++
c.L.Unlock()

log.Printf("运动员#%d 已准备就绪\n", i)
// 广播唤醒所有的等待者
c.Broadcast()
}(i)
}

// c.L.Lock() 去掉了锁
for ready != 10 {
c.Wait()
log.Println("裁判员被唤醒一次")
}
// c.L.Unlock()

//所有的运动员是否就绪
log.Println("所有运动员都准备就绪。比赛开始,3,2,1, ......")
}

上面代码运行,就会报释放未加锁的 panic。

出现这个问题的原因在于,cond.Wait 方法的实现是,把当前调用者加入到 notify 队列之中后会释放锁(如果不释放锁,其他 Wait 的调用者就没有机会加入到 notify 队列中了),然后一直等待;等调用者被唤醒之后,又会去争抢这把锁。如果调用 Wait 之前不加锁的话,就有可能 Unlock 一个未加锁的 Locker。所以切记,调用 cond.Wait 方法之前一定要加锁。

没有检查等待条件是否满足

只调用了一次 Wait,没有检查等待条件是否满足,结果条件没满足,程序就继续执行了。出现这个问题的原因在于,误以为 Cond 的使用,就像 WaitGroup 那样调用一下 Wait 方法等待那么简单。比如下面的代码中。将 for 循环注释掉。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

func main() {
c := sync.NewCond(&sync.Mutex{})
var ready int

for i := 0; i < 10; i++ {
go func(i int) {
time.Sleep(time.Duration(rand.Int63n(10)) * time.Second)

// 加锁更改等待条件
c.L.Lock()
ready++
c.L.Unlock()

log.Printf("运动员#%d 已准备就绪\n", i)
// 广播唤醒所有的等待者
c.Broadcast()
}(i)
}

c.L.Lock()
// for ready != 10 { 注释掉,没有检查等待条件是否满足,直接 Wait()一次就结束了
c.Wait()
log.Println("裁判员被唤醒一次")
// }
c.L.Unlock()

//所有的运动员是否就绪
log.Println("所有运动员都准备就绪。比赛开始,3,2,1, ......")
}

运行这个程序,会发现,可能只有几个运动员准备好之后程序就运行完了,而不是我们期望的所有运动员都准备好才进行下一步。

原因在于,每一个运动员准备好之后都会唤醒所有的等待者,也就是这里的裁判员,比如第一个运动员准备好后就唤醒了裁判员,结果这个裁判员傻傻地没做任何检查,以为所有的运动员都准备好了,就继续执行了。

所以,waiter goroutine 被唤醒不等于等待条件被满足,只是有 goroutine 把它唤醒了而已,等待条件有可能已经满足了,也有可能不满足,我们需要进一步检查。你也可以理解为,等待者被唤醒,只是得到了一次检查的机会而已。

总结

img